package edu.ucla.cs.rpc.multicast.network;

import java.io.IOException;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import edu.ucla.cs.rpc.multicast.handlers.MessageHandler;
import edu.ucla.cs.rpc.multicast.network.message.Message;
import edu.ucla.cs.rpc.multicast.network.message.Message.MessageType;

/**
 * MulticastManager maintains group membership lists for senders and receivers,
 * and notifies each member of changes in group membership. This is necessary
 * because since Java does not provide reliable, in-order multicast primitives,
 * they must be simulated with TCP connections from a sender to every receiver
 * when a message is sent.
 * 
 * @author Chase Covello Philip Russell
 * 
 */
public class MulticastManager {

	/**
	 * This handler receives messages from senders and receivers whenever they
	 * join or leave the group. Both senders and receivers are notified when a
	 * receiver joins, and receivers are notified when a sender joins.
	 * 
	 * @author Chase Covello Philip Russell
	 * 
	 */
	private class MulticastManagerMessageHandler implements MessageHandler {

		/**
		 * Receive join/leave messages and update the appropriate map.
		 */
		public void receive(Message message) {
			try {
				SocketAddress source = message.getSource();
				Long senderID = message.getSenderID();

				if (message.getType() == MessageType.JOIN_RECEIVERS) {
					forward(senders, message);
					forward(receivers, message);
					sendIDs(source);
					synchronized (receivers) {
						receivers.put(senderID, source);
					}
				} else if (message.getType() == MessageType.LEAVE_RECEIVERS) {
					synchronized (receivers) {
						receivers.remove(senderID);
					}
					forward(senders, message);
					forward(receivers, message);
				} else if (message.getType() == MessageType.JOIN_SENDERS) {
					forward(receivers, message);
					sendReceiverSocketAddresses(source);
					synchronized (senders) {
						senders.put(senderID, source);
					}
				} else if (message.getType() == MessageType.LEAVE_SENDERS) {
					synchronized (senders) {
						senders.remove(senderID);
					}
					forward(receivers, message);
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}

	}

	private MessageReceiver incoming;

	private Map<Long, SocketAddress> receivers;

	private Map<Long, SocketAddress> senders;

	/**
	 * Constructs a new MulticastManager that listens on a dynamically-assigned
	 * port.
	 * 
	 * @throws IOException
	 *             in the case of unrecoverable network errors.
	 */
	public MulticastManager() throws IOException {
		incoming = new MessageReceiver(new MulticastManagerMessageHandler());

		senders = new HashMap<Long, SocketAddress>();
		receivers = new HashMap<Long, SocketAddress>();
	}

	/**
	 * Forwards the given message to every member of the group.
	 * 
	 * @param message
	 *            the message to forward.
	 * @throws IOException
	 *             in the case of unrecoverable network errors.
	 */
	private void forward(Map<Long, SocketAddress> group, Message message)
			throws IOException {
		synchronized (group) {
			for (SocketAddress receiver : group.values())
				message.send(receiver);
		}
	}

	/**
	 * Returns the multicast manager's socket address.
	 * 
	 * @return the manager's address.
	 */
	public SocketAddress getSocketAddress() {
		return incoming.getSocketAddress();
	}

	/**
	 * Sends the node IDs of all group members to the given receiver.
	 * 
	 * @param receiver
	 *            the receiver to notify.
	 * @throws IOException
	 *             in the case of unrecoverable network errors.
	 */
	private void sendIDs(SocketAddress receiver) throws IOException {
		synchronized (receivers) {
			for (Entry<Long, SocketAddress> entry : receivers.entrySet()) {
				Message m = new Message(MessageType.JOIN_RECEIVERS, entry
						.getValue());
				m.setSenderID(entry.getKey());
				m.send(receiver);
			}
		}
		synchronized (senders) {
			for (Entry<Long, SocketAddress> entry : senders.entrySet()) {
				Message m = new Message(MessageType.JOIN_SENDERS, entry
						.getValue());
				m.setSenderID(entry.getKey());
				m.send(receiver);
			}
		}
	}

	/**
	 * Sends the socket addresses of all receivers in the group to the given
	 * sender.
	 * 
	 * @param sender
	 *            the sender to notify.
	 * @throws IOException
	 *             in the case of unrecoverable network errors.
	 */
	private void sendReceiverSocketAddresses(SocketAddress sender)
			throws IOException {
		synchronized (receivers) {
			for (SocketAddress receiver : receivers.values())
				new Message(MessageType.JOIN_RECEIVERS, receiver).send(sender);
		}
	}

	/**
	 * Shuts down any threads created by this multicast manager. This method
	 * <b>must</b> be called before disposing of all references to this object;
	 * otherwise, its thread will continue to run until the application is
	 * forcibly terminated.
	 */
	public void shutdown() {
		incoming.shutdown();
	}
}
